package com.gl.sass.mq.producer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

import com.gl.sass.mq.producer.retry.LimitDelayQueue;
import com.gl.sass.mq.producer.retry.RetryMessage;

import java.util.Objects;
import java.util.Optional;

/**
 * 	重试监听器
 * @author xiehong
 *
 */
public class RetryListenser implements DisposableBean, Runnable, InitializingBean {

    private static final Logger log = LoggerFactory.getLogger(RetryListenser.class);

    private LimitDelayQueue<RetryMessage> limitDelayQueue;

    private RocketMQProducer rocketMQProducer;

    private int maxRetryTimes;

    private volatile boolean workCondition = true;

    public RetryListenser(LimitDelayQueue<RetryMessage> limitDelayQueue, RocketMQProducer rocketMQProducer,
                          int maxRetryTimes) {
        this.limitDelayQueue = limitDelayQueue;
        this.rocketMQProducer = rocketMQProducer;
        this.maxRetryTimes = maxRetryTimes;
    }

    @Override
    public void run() {
        log.info("MQ重试监听器启动: {}", workCondition);
        while (workCondition) {
            this.takeAndSend();
        }
    }

    @Override
    public void destroy() throws Exception {
        workCondition = false;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        // 注册当消息队列已满时触发的事件
        limitDelayQueue.registConsumer(this::sendFirst);
        Thread thread = new Thread(this);
        thread.start();
    }

    /**
     * 	获取可处理的重试事件 并发送
     * 	如果发送失败则再次放入重试队列
     */
    private void takeAndSend() {
        RetryMessage message = null;
        try {
            message = limitDelayQueue.take();
            if (Objects.isNull(message)) {
                return;
            }

            log.warn("消息发送重试：{}", message);
            rocketMQProducer.send(message.getMsg());
        } catch (Exception e) {
            log.warn("消息重发异常,放入队列等待再次重试,{} ,{}:", message, e.getMessage(), e);
            Optional.ofNullable(message).ifPresent(this::checkRetryAgain);
        }
    }

    private void checkRetryAgain(RetryMessage o) {
        if (o.getRetryTimes() >= maxRetryTimes) {
            log.warn("当前消息已重试达到最大次数，不再重试: maxRetryTimes={}, message={}", maxRetryTimes, o);
            return;
        }
        limitDelayQueue.add(o.retryAgain());
    }

    /**
     * 	队列已满时则放入立刻重试队列最近一个消息
     * 	失败不放入重试队列中
     *
     * @param o 消息
     */
    private void sendFirst(RetryMessage o) {
        log.error("队列已满, 取队列头部消息立刻发送出去：{}", Optional.ofNullable(o).map(RetryMessage::toPrint).orElse(null));
        if (Objects.isNull(o)) {
            return;
        }
        try {
            rocketMQProducer.send(o.getMsg());
        } catch (Exception e) {
            log.error("队列已满，发送消息也失败了: " + e.getMessage(), e);
        }
    }

}
